from django.test import TestCase

from cytasks.tasks import task2, task3, task4


#  测试 celery
class CytaskTests(TestCase):
    # 终端运行 python manage.py test  cytasks.tests.CytaskTests.test_task2
    def test_task2(self):
        res = task2.delay('arg1', 'arg2')  # 异步进行 是 apply_async 的简化版
        print(res.id)

    # 终端运行 python manage.py test  cytasks.tests.CytaskTests.test_task3
    def test_task3(self):
        print(task3.name) # 打印任务名称  
        res = task3.apply_async()
        print(res.id)

    # 终端运行 python manage.py test  cytasks.tests.CytaskTests.test_task4
    def test_task4(self):
        res = task4.apply_async()
        print(res.id)
